/*
 * Binance Spot WebSocket Streams
 *
 * OpenAPI Specifications for the Binance Spot WebSocket Streams
 *
 * API documents:
 * - [Github web-socket-streams documentation file](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md)
 * - [General API information for web-socket-streams on website](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams)
 *
 *
 * The version of the OpenAPI document: 1.0.0
 *
 *
 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
 * https://openapi-generator.tech
 * Do not edit the class manually.
 */

#![allow(unused_imports)]
use async_trait::async_trait;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};

use crate::common::{
    models::ParamBuildError,
    utils::replace_websocket_streams_placeholders,
    websocket::{WebsocketBase, WebsocketStream, WebsocketStreams, create_stream_handler},
};
use crate::spot::websocket_streams::models;

#[async_trait]
pub trait WebSocketStreamsApi: Send + Sync {
    async fn agg_trade(
        &self,
        params: AggTradeParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>>;
    async fn all_market_rolling_window_ticker(
        &self,
        params: AllMarketRollingWindowTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>;
    async fn all_mini_ticker(
        &self,
        params: AllMiniTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>>;
    async fn all_ticker(
        &self,
        params: AllTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllTickerResponseInner>>>>;
    async fn avg_price(
        &self,
        params: AvgPriceParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>>;
    async fn book_ticker(
        &self,
        params: BookTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>>;
    async fn diff_book_depth(
        &self,
        params: DiffBookDepthParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>>;
    async fn kline(
        &self,
        params: KlineParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>>;
    async fn kline_offset(
        &self,
        params: KlineOffsetParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>>;
    async fn mini_ticker(
        &self,
        params: MiniTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>>;
    async fn partial_book_depth(
        &self,
        params: PartialBookDepthParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>>;
    async fn rolling_window_ticker(
        &self,
        params: RollingWindowTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>>;
    async fn ticker(
        &self,
        params: TickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>>;
    async fn trade(
        &self,
        params: TradeParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>>;
}

pub struct WebSocketStreamsApiClient {
    websocket_streams_base: Arc<WebsocketStreams>,
}

impl WebSocketStreamsApiClient {
    pub fn new(websocket_streams_base: Arc<WebsocketStreams>) -> Self {
        Self {
            websocket_streams_base,
        }
    }
}

#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AllMarketRollingWindowTickerWindowSizeEnum {
    #[serde(rename = "1h")]
    WindowSize1h,
    #[serde(rename = "4h")]
    WindowSize4h,
    #[serde(rename = "1d")]
    WindowSize1d,
}

impl AllMarketRollingWindowTickerWindowSizeEnum {
    #[must_use]
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::WindowSize1h => "1h",
            Self::WindowSize4h => "4h",
            Self::WindowSize1d => "1d",
        }
    }
}

impl std::str::FromStr for AllMarketRollingWindowTickerWindowSizeEnum {
    type Err = Box<dyn std::error::Error + Send + Sync>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "1h" => Ok(Self::WindowSize1h),
            "4h" => Ok(Self::WindowSize4h),
            "1d" => Ok(Self::WindowSize1d),
            other => Err(format!(
                "invalid AllMarketRollingWindowTickerWindowSizeEnum: {}",
                other
            )
            .into()),
        }
    }
}

#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum KlineIntervalEnum {
    #[serde(rename = "1s")]
    Interval1s,
    #[serde(rename = "1m")]
    Interval1m,
    #[serde(rename = "3m")]
    Interval3m,
    #[serde(rename = "5m")]
    Interval5m,
    #[serde(rename = "15m")]
    Interval15m,
    #[serde(rename = "30m")]
    Interval30m,
    #[serde(rename = "1h")]
    Interval1h,
    #[serde(rename = "2h")]
    Interval2h,
    #[serde(rename = "4h")]
    Interval4h,
    #[serde(rename = "6h")]
    Interval6h,
    #[serde(rename = "8h")]
    Interval8h,
    #[serde(rename = "12h")]
    Interval12h,
    #[serde(rename = "1d")]
    Interval1d,
    #[serde(rename = "3d")]
    Interval3d,
    #[serde(rename = "1w")]
    Interval1w,
    #[serde(rename = "1M")]
    Interval1M,
}

impl KlineIntervalEnum {
    #[must_use]
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Interval1s => "1s",
            Self::Interval1m => "1m",
            Self::Interval3m => "3m",
            Self::Interval5m => "5m",
            Self::Interval15m => "15m",
            Self::Interval30m => "30m",
            Self::Interval1h => "1h",
            Self::Interval2h => "2h",
            Self::Interval4h => "4h",
            Self::Interval6h => "6h",
            Self::Interval8h => "8h",
            Self::Interval12h => "12h",
            Self::Interval1d => "1d",
            Self::Interval3d => "3d",
            Self::Interval1w => "1w",
            Self::Interval1M => "1M",
        }
    }
}

impl std::str::FromStr for KlineIntervalEnum {
    type Err = Box<dyn std::error::Error + Send + Sync>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "1s" => Ok(Self::Interval1s),
            "1m" => Ok(Self::Interval1m),
            "3m" => Ok(Self::Interval3m),
            "5m" => Ok(Self::Interval5m),
            "15m" => Ok(Self::Interval15m),
            "30m" => Ok(Self::Interval30m),
            "1h" => Ok(Self::Interval1h),
            "2h" => Ok(Self::Interval2h),
            "4h" => Ok(Self::Interval4h),
            "6h" => Ok(Self::Interval6h),
            "8h" => Ok(Self::Interval8h),
            "12h" => Ok(Self::Interval12h),
            "1d" => Ok(Self::Interval1d),
            "3d" => Ok(Self::Interval3d),
            "1w" => Ok(Self::Interval1w),
            "1M" => Ok(Self::Interval1M),
            other => Err(format!("invalid KlineIntervalEnum: {}", other).into()),
        }
    }
}

#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum KlineOffsetIntervalEnum {
    #[serde(rename = "1s")]
    Interval1s,
    #[serde(rename = "1m")]
    Interval1m,
    #[serde(rename = "3m")]
    Interval3m,
    #[serde(rename = "5m")]
    Interval5m,
    #[serde(rename = "15m")]
    Interval15m,
    #[serde(rename = "30m")]
    Interval30m,
    #[serde(rename = "1h")]
    Interval1h,
    #[serde(rename = "2h")]
    Interval2h,
    #[serde(rename = "4h")]
    Interval4h,
    #[serde(rename = "6h")]
    Interval6h,
    #[serde(rename = "8h")]
    Interval8h,
    #[serde(rename = "12h")]
    Interval12h,
    #[serde(rename = "1d")]
    Interval1d,
    #[serde(rename = "3d")]
    Interval3d,
    #[serde(rename = "1w")]
    Interval1w,
    #[serde(rename = "1M")]
    Interval1M,
}

impl KlineOffsetIntervalEnum {
    #[must_use]
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Interval1s => "1s",
            Self::Interval1m => "1m",
            Self::Interval3m => "3m",
            Self::Interval5m => "5m",
            Self::Interval15m => "15m",
            Self::Interval30m => "30m",
            Self::Interval1h => "1h",
            Self::Interval2h => "2h",
            Self::Interval4h => "4h",
            Self::Interval6h => "6h",
            Self::Interval8h => "8h",
            Self::Interval12h => "12h",
            Self::Interval1d => "1d",
            Self::Interval3d => "3d",
            Self::Interval1w => "1w",
            Self::Interval1M => "1M",
        }
    }
}

impl std::str::FromStr for KlineOffsetIntervalEnum {
    type Err = Box<dyn std::error::Error + Send + Sync>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "1s" => Ok(Self::Interval1s),
            "1m" => Ok(Self::Interval1m),
            "3m" => Ok(Self::Interval3m),
            "5m" => Ok(Self::Interval5m),
            "15m" => Ok(Self::Interval15m),
            "30m" => Ok(Self::Interval30m),
            "1h" => Ok(Self::Interval1h),
            "2h" => Ok(Self::Interval2h),
            "4h" => Ok(Self::Interval4h),
            "6h" => Ok(Self::Interval6h),
            "8h" => Ok(Self::Interval8h),
            "12h" => Ok(Self::Interval12h),
            "1d" => Ok(Self::Interval1d),
            "3d" => Ok(Self::Interval3d),
            "1w" => Ok(Self::Interval1w),
            "1M" => Ok(Self::Interval1M),
            other => Err(format!("invalid KlineOffsetIntervalEnum: {}", other).into()),
        }
    }
}

#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PartialBookDepthLevelsEnum {
    #[serde(rename = "5")]
    Levels5,
    #[serde(rename = "10")]
    Levels10,
    #[serde(rename = "20")]
    Levels20,
}

impl PartialBookDepthLevelsEnum {
    #[must_use]
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Levels5 => "5",
            Self::Levels10 => "10",
            Self::Levels20 => "20",
        }
    }
}

impl std::str::FromStr for PartialBookDepthLevelsEnum {
    type Err = Box<dyn std::error::Error + Send + Sync>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "5" => Ok(Self::Levels5),
            "10" => Ok(Self::Levels10),
            "20" => Ok(Self::Levels20),
            other => Err(format!("invalid PartialBookDepthLevelsEnum: {}", other).into()),
        }
    }
}

#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RollingWindowTickerWindowSizeEnum {
    #[serde(rename = "1h")]
    WindowSize1h,
    #[serde(rename = "4h")]
    WindowSize4h,
    #[serde(rename = "1d")]
    WindowSize1d,
}

impl RollingWindowTickerWindowSizeEnum {
    #[must_use]
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::WindowSize1h => "1h",
            Self::WindowSize4h => "4h",
            Self::WindowSize1d => "1d",
        }
    }
}

impl std::str::FromStr for RollingWindowTickerWindowSizeEnum {
    type Err = Box<dyn std::error::Error + Send + Sync>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "1h" => Ok(Self::WindowSize1h),
            "4h" => Ok(Self::WindowSize4h),
            "1d" => Ok(Self::WindowSize1d),
            other => Err(format!("invalid RollingWindowTickerWindowSizeEnum: {}", other).into()),
        }
    }
}

/// Request parameters for the [`agg_trade`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`agg_trade`](#method.agg_trade).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AggTradeParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl AggTradeParams {
    /// Create a builder for [`agg_trade`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    ///
    #[must_use]
    pub fn builder(symbol: String) -> AggTradeParamsBuilder {
        AggTradeParamsBuilder::default().symbol(symbol)
    }
}
/// Request parameters for the [`all_market_rolling_window_ticker`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`all_market_rolling_window_ticker`](#method.all_market_rolling_window_ticker).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AllMarketRollingWindowTickerParams {
    ///
    /// The `window_size` parameter.
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub window_size: AllMarketRollingWindowTickerWindowSizeEnum,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl AllMarketRollingWindowTickerParams {
    /// Create a builder for [`all_market_rolling_window_ticker`].
    ///
    /// Required parameters:
    ///
    /// * `window_size` — String
    ///
    #[must_use]
    pub fn builder(
        window_size: AllMarketRollingWindowTickerWindowSizeEnum,
    ) -> AllMarketRollingWindowTickerParamsBuilder {
        AllMarketRollingWindowTickerParamsBuilder::default().window_size(window_size)
    }
}
/// Request parameters for the [`all_mini_ticker`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`all_mini_ticker`](#method.all_mini_ticker).
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AllMiniTickerParams {
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl AllMiniTickerParams {
    /// Create a builder for [`all_mini_ticker`].
    ///
    #[must_use]
    pub fn builder() -> AllMiniTickerParamsBuilder {
        AllMiniTickerParamsBuilder::default()
    }
}
/// Request parameters for the [`all_ticker`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`all_ticker`](#method.all_ticker).
#[derive(Clone, Debug, Builder, Default)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AllTickerParams {
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl AllTickerParams {
    /// Create a builder for [`all_ticker`].
    ///
    #[must_use]
    pub fn builder() -> AllTickerParamsBuilder {
        AllTickerParamsBuilder::default()
    }
}
/// Request parameters for the [`avg_price`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`avg_price`](#method.avg_price).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct AvgPriceParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl AvgPriceParams {
    /// Create a builder for [`avg_price`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    ///
    #[must_use]
    pub fn builder(symbol: String) -> AvgPriceParamsBuilder {
        AvgPriceParamsBuilder::default().symbol(symbol)
    }
}
/// Request parameters for the [`book_ticker`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`book_ticker`](#method.book_ticker).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct BookTickerParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl BookTickerParams {
    /// Create a builder for [`book_ticker`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    ///
    #[must_use]
    pub fn builder(symbol: String) -> BookTickerParamsBuilder {
        BookTickerParamsBuilder::default().symbol(symbol)
    }
}
/// Request parameters for the [`diff_book_depth`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`diff_book_depth`](#method.diff_book_depth).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct DiffBookDepthParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
    /// 1000ms or 100ms
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub update_speed: Option<String>,
}

impl DiffBookDepthParams {
    /// Create a builder for [`diff_book_depth`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    ///
    #[must_use]
    pub fn builder(symbol: String) -> DiffBookDepthParamsBuilder {
        DiffBookDepthParamsBuilder::default().symbol(symbol)
    }
}
/// Request parameters for the [`kline`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`kline`](#method.kline).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct KlineParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    ///
    /// The `interval` parameter.
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub interval: KlineIntervalEnum,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl KlineParams {
    /// Create a builder for [`kline`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    /// * `interval` — String
    ///
    #[must_use]
    pub fn builder(symbol: String, interval: KlineIntervalEnum) -> KlineParamsBuilder {
        KlineParamsBuilder::default()
            .symbol(symbol)
            .interval(interval)
    }
}
/// Request parameters for the [`kline_offset`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`kline_offset`](#method.kline_offset).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct KlineOffsetParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    ///
    /// The `interval` parameter.
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub interval: KlineOffsetIntervalEnum,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl KlineOffsetParams {
    /// Create a builder for [`kline_offset`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    /// * `interval` — String
    ///
    #[must_use]
    pub fn builder(symbol: String, interval: KlineOffsetIntervalEnum) -> KlineOffsetParamsBuilder {
        KlineOffsetParamsBuilder::default()
            .symbol(symbol)
            .interval(interval)
    }
}
/// Request parameters for the [`mini_ticker`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`mini_ticker`](#method.mini_ticker).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct MiniTickerParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl MiniTickerParams {
    /// Create a builder for [`mini_ticker`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    ///
    #[must_use]
    pub fn builder(symbol: String) -> MiniTickerParamsBuilder {
        MiniTickerParamsBuilder::default().symbol(symbol)
    }
}
/// Request parameters for the [`partial_book_depth`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`partial_book_depth`](#method.partial_book_depth).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct PartialBookDepthParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    ///
    /// The `levels` parameter.
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub levels: PartialBookDepthLevelsEnum,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
    /// 1000ms or 100ms
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub update_speed: Option<String>,
}

impl PartialBookDepthParams {
    /// Create a builder for [`partial_book_depth`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    /// * `levels` — String
    ///
    #[must_use]
    pub fn builder(
        symbol: String,
        levels: PartialBookDepthLevelsEnum,
    ) -> PartialBookDepthParamsBuilder {
        PartialBookDepthParamsBuilder::default()
            .symbol(symbol)
            .levels(levels)
    }
}
/// Request parameters for the [`rolling_window_ticker`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`rolling_window_ticker`](#method.rolling_window_ticker).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct RollingWindowTickerParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    ///
    /// The `window_size` parameter.
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub window_size: RollingWindowTickerWindowSizeEnum,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl RollingWindowTickerParams {
    /// Create a builder for [`rolling_window_ticker`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    /// * `window_size` — String
    ///
    #[must_use]
    pub fn builder(
        symbol: String,
        window_size: RollingWindowTickerWindowSizeEnum,
    ) -> RollingWindowTickerParamsBuilder {
        RollingWindowTickerParamsBuilder::default()
            .symbol(symbol)
            .window_size(window_size)
    }
}
/// Request parameters for the [`ticker`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`ticker`](#method.ticker).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct TickerParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl TickerParams {
    /// Create a builder for [`ticker`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    ///
    #[must_use]
    pub fn builder(symbol: String) -> TickerParamsBuilder {
        TickerParamsBuilder::default().symbol(symbol)
    }
}
/// Request parameters for the [`trade`] operation.
///
/// This struct holds all of the inputs you can pass when calling
/// [`trade`](#method.trade).
#[derive(Clone, Debug, Builder)]
#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
pub struct TradeParams {
    /// Symbol to query
    ///
    /// This field is **required.
    #[builder(setter(into))]
    pub symbol: String,
    /// Unique WebSocket request ID.
    ///
    /// This field is **optional.
    #[builder(setter(into), default)]
    pub id: Option<String>,
}

impl TradeParams {
    /// Create a builder for [`trade`].
    ///
    /// Required parameters:
    ///
    /// * `symbol` — Symbol to query
    ///
    #[must_use]
    pub fn builder(symbol: String) -> TradeParamsBuilder {
        TradeParamsBuilder::default().symbol(symbol)
    }
}

#[async_trait]
impl WebSocketStreamsApi for WebSocketStreamsApiClient {
    async fn agg_trade(
        &self,
        params: AggTradeParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::AggTradeResponse>>> {
        let AggTradeParams { symbol, id } = params;

        let pairs: &[(&str, Option<String>)] =
            &[("symbol", Some(symbol.clone())), ("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);

        Ok(create_stream_handler::<models::AggTradeResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn all_market_rolling_window_ticker(
        &self,
        params: AllMarketRollingWindowTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMarketRollingWindowTickerResponseInner>>>>
    {
        let AllMarketRollingWindowTickerParams { window_size, id } = params;

        let pairs: &[(&str, Option<String>)] = &[
            ("windowSize", Some(window_size.as_str().to_string())),
            ("id", id.clone()),
        ];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);

        Ok(
            create_stream_handler::<Vec<models::AllMarketRollingWindowTickerResponseInner>>(
                WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
                stream,
                id_opt,
            )
            .await,
        )
    }

    async fn all_mini_ticker(
        &self,
        params: AllMiniTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllMiniTickerResponseInner>>>> {
        let AllMiniTickerParams { id } = params;

        let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);

        Ok(
            create_stream_handler::<Vec<models::AllMiniTickerResponseInner>>(
                WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
                stream,
                id_opt,
            )
            .await,
        )
    }

    async fn all_ticker(
        &self,
        params: AllTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<Vec<models::AllTickerResponseInner>>>> {
        let AllTickerParams { id } = params;

        let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/!ticker@arr", &vars);

        Ok(
            create_stream_handler::<Vec<models::AllTickerResponseInner>>(
                WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
                stream,
                id_opt,
            )
            .await,
        )
    }

    async fn avg_price(
        &self,
        params: AvgPriceParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::AvgPriceResponse>>> {
        let AvgPriceParams { symbol, id } = params;

        let pairs: &[(&str, Option<String>)] =
            &[("symbol", Some(symbol.clone())), ("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);

        Ok(create_stream_handler::<models::AvgPriceResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn book_ticker(
        &self,
        params: BookTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::BookTickerResponse>>> {
        let BookTickerParams { symbol, id } = params;

        let pairs: &[(&str, Option<String>)] =
            &[("symbol", Some(symbol.clone())), ("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);

        Ok(create_stream_handler::<models::BookTickerResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn diff_book_depth(
        &self,
        params: DiffBookDepthParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::DiffBookDepthResponse>>> {
        let DiffBookDepthParams {
            symbol,
            id,
            update_speed,
        } = params;

        let pairs: &[(&str, Option<String>)] = &[
            ("symbol", Some(symbol.clone())),
            ("id", id.clone()),
            ("updateSpeed", update_speed.clone()),
        ];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);

        Ok(create_stream_handler::<models::DiffBookDepthResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn kline(
        &self,
        params: KlineParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineResponse>>> {
        let KlineParams {
            symbol,
            interval,
            id,
        } = params;

        let pairs: &[(&str, Option<String>)] = &[
            ("symbol", Some(symbol.clone())),
            ("interval", Some(interval.as_str().to_string())),
            ("id", id.clone()),
        ];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);

        Ok(create_stream_handler::<models::KlineResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn kline_offset(
        &self,
        params: KlineOffsetParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::KlineOffsetResponse>>> {
        let KlineOffsetParams {
            symbol,
            interval,
            id,
        } = params;

        let pairs: &[(&str, Option<String>)] = &[
            ("symbol", Some(symbol.clone())),
            ("interval", Some(interval.as_str().to_string())),
            ("id", id.clone()),
        ];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream =
            replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);

        Ok(create_stream_handler::<models::KlineOffsetResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn mini_ticker(
        &self,
        params: MiniTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::MiniTickerResponse>>> {
        let MiniTickerParams { symbol, id } = params;

        let pairs: &[(&str, Option<String>)] =
            &[("symbol", Some(symbol.clone())), ("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);

        Ok(create_stream_handler::<models::MiniTickerResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn partial_book_depth(
        &self,
        params: PartialBookDepthParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::PartialBookDepthResponse>>> {
        let PartialBookDepthParams {
            symbol,
            levels,
            id,
            update_speed,
        } = params;

        let pairs: &[(&str, Option<String>)] = &[
            ("symbol", Some(symbol.clone())),
            ("levels", Some(levels.as_str().to_string())),
            ("id", id.clone()),
            ("updateSpeed", update_speed.clone()),
        ];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream =
            replace_websocket_streams_placeholders("/<symbol>@depth<levels>@<updateSpeed>", &vars);

        Ok(create_stream_handler::<models::PartialBookDepthResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn rolling_window_ticker(
        &self,
        params: RollingWindowTickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::RollingWindowTickerResponse>>> {
        let RollingWindowTickerParams {
            symbol,
            window_size,
            id,
        } = params;

        let pairs: &[(&str, Option<String>)] = &[
            ("symbol", Some(symbol.clone())),
            ("windowSize", Some(window_size.as_str().to_string())),
            ("id", id.clone()),
        ];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);

        Ok(
            create_stream_handler::<models::RollingWindowTickerResponse>(
                WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
                stream,
                id_opt,
            )
            .await,
        )
    }

    async fn ticker(
        &self,
        params: TickerParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::TickerResponse>>> {
        let TickerParams { symbol, id } = params;

        let pairs: &[(&str, Option<String>)] =
            &[("symbol", Some(symbol.clone())), ("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);

        Ok(create_stream_handler::<models::TickerResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }

    async fn trade(
        &self,
        params: TradeParams,
    ) -> anyhow::Result<Arc<WebsocketStream<models::TradeResponse>>> {
        let TradeParams { symbol, id } = params;

        let pairs: &[(&str, Option<String>)] =
            &[("symbol", Some(symbol.clone())), ("id", id.clone())];

        let vars: HashMap<_, _> = pairs
            .iter()
            .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
            .collect();

        let id_opt: Option<String> = vars.get("id").map(std::string::ToString::to_string);

        let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);

        Ok(create_stream_handler::<models::TradeResponse>(
            WebsocketBase::WebsocketStreams(Arc::clone(&self.websocket_streams_base)),
            stream,
            id_opt,
        )
        .await)
    }
}

#[cfg(all(test, feature = "spot"))]
mod tests {
    use super::*;
    use crate::TOKIO_SHARED_RT;
    use crate::{
        common::websocket::{WebsocketConnection, WebsocketHandler},
        config::ConfigurationWebsocketStreams,
    };
    use serde_json::json;
    use std::sync::atomic::{AtomicBool, Ordering};
    use tokio::task::yield_now;

    async fn make_streams_base() -> (Arc<WebsocketStreams>, Arc<WebsocketConnection>) {
        let conn = WebsocketConnection::new("test");
        let config = ConfigurationWebsocketStreams::builder()
            .build()
            .expect("Failed to build configuration");
        let streams_base = WebsocketStreams::new(config, vec![conn.clone()]);
        conn.set_handler(streams_base.clone() as Arc<dyn WebsocketHandler>)
            .await;
        (streams_base, conn)
    }

    #[test]
    fn agg_trade_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AggTradeParams::builder("bnbusdt".to_string())
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let AggTradeParams { symbol, id } = params.clone();

            let pairs: &[(&str, Option<String>)] =
                &[("symbol", Some(symbol.clone())), ("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);
            let ws_stream = api
                .agg_trade(params)
                .await
                .expect("agg_trade should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn agg_trade_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let AggTradeParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);

            let ws_stream = api.agg_trade(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::AggTradeResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn agg_trade_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AggTradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let AggTradeParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@aggTrade", &vars);

            let ws_stream = api.agg_trade(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::AggTradeResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"aggTrade","E":1672515782136,"s":"BNBBTC","a":12345,"p":"0.001","q":"100","f":100,"l":105,"T":1672515782136,"m":true,"M":true}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn all_market_rolling_window_ticker_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllMarketRollingWindowTickerParams::builder(
                AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,
            )
            .id(Some(id.clone()))
            .build()
            .unwrap();

            let AllMarketRollingWindowTickerParams { window_size, id } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("windowSize", Some(window_size.as_str().to_string())),
                ("id", id.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);
            let ws_stream = api
                .all_market_rolling_window_ticker(params)
                .await
                .expect("all_market_rolling_window_ticker should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn all_market_rolling_window_ticker_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();

            let AllMarketRollingWindowTickerParams {
                window_size,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("windowSize",
                        Some(window_size.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);

            let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn all_market_rolling_window_ticker_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllMarketRollingWindowTickerParams::builder(AllMarketRollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();

            let AllMarketRollingWindowTickerParams {
                window_size,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("windowSize",
                        Some(window_size.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!ticker_<windowSize>@arr", &vars);

            let ws_stream = api.all_market_rolling_window_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: Vec<models::AllMarketRollingWindowTickerResponseInner>| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"[{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}]"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn all_mini_ticker_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllMiniTickerParams::builder()
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let AllMiniTickerParams { id } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);
            let ws_stream = api
                .all_mini_ticker(params)
                .await
                .expect("all_mini_ticker should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn all_mini_ticker_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();

            let AllMiniTickerParams {
                id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);

            let ws_stream = api.all_mini_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn all_mini_ticker_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllMiniTickerParams::builder().id(Some(id.clone())).build().unwrap();

            let AllMiniTickerParams {
                id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!miniTicker@arr", &vars);

            let ws_stream = api.all_mini_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: Vec<models::AllMiniTickerResponseInner>| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"[{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}]"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn all_ticker_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllTickerParams::builder()
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let AllTickerParams { id } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!ticker@arr", &vars);
            let ws_stream = api
                .all_ticker(params)
                .await
                .expect("all_ticker should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn all_ticker_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllTickerParams::builder().id(Some(id.clone())).build().unwrap();

            let AllTickerParams {
                id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!ticker@arr", &vars);

            let ws_stream = api.all_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: Vec<models::AllTickerResponseInner>| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"[{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}]"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn all_ticker_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AllTickerParams::builder().id(Some(id.clone())).build().unwrap();

            let AllTickerParams {
                id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/!ticker@arr", &vars);

            let ws_stream = api.all_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: Vec<models::AllTickerResponseInner>| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"[{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}]"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn avg_price_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AvgPriceParams::builder("bnbusdt".to_string())
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let AvgPriceParams { symbol, id } = params.clone();

            let pairs: &[(&str, Option<String>)] =
                &[("symbol", Some(symbol.clone())), ("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);
            let ws_stream = api
                .avg_price(params)
                .await
                .expect("avg_price should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn avg_price_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let AvgPriceParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);

            let ws_stream = api.avg_price(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn avg_price_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = AvgPriceParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let AvgPriceParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@avgPrice", &vars);

            let ws_stream = api.avg_price(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::AvgPriceResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"avgPrice","E":1693907033000,"s":"BTCUSDT","i":"5m","w":"25776.86000000","T":1693907032213}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn book_ticker_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = BookTickerParams::builder("bnbusdt".to_string())
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let BookTickerParams { symbol, id } = params.clone();

            let pairs: &[(&str, Option<String>)] =
                &[("symbol", Some(symbol.clone())), ("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);
            let ws_stream = api
                .book_ticker(params)
                .await
                .expect("book_ticker should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn book_ticker_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let BookTickerParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);

            let ws_stream = api.book_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::BookTickerResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn book_ticker_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = BookTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let BookTickerParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@bookTicker", &vars);

            let ws_stream = api.book_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::BookTickerResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"u":400900217,"s":"BNBUSDT","b":"25.35190000","B":"31.21000000","a":"25.36520000","A":"40.66000000"}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn diff_book_depth_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = DiffBookDepthParams::builder("bnbusdt".to_string())
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let DiffBookDepthParams {
                symbol,
                id,
                update_speed,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol", Some(symbol.clone())),
                ("id", id.clone()),
                ("updateSpeed", update_speed.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream =
                replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);
            let ws_stream = api
                .diff_book_depth(params)
                .await
                .expect("diff_book_depth should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn diff_book_depth_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let DiffBookDepthParams {
                symbol,id,update_speed,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
                ("updateSpeed",
                        update_speed.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);

            let ws_stream = api.diff_book_depth(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn diff_book_depth_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = DiffBookDepthParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let DiffBookDepthParams {
                symbol,id,update_speed,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
                ("updateSpeed",
                        update_speed.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@depth@<updateSpeed>", &vars);

            let ws_stream = api.diff_book_depth(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::DiffBookDepthResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"depthUpdate","E":1672515782136,"s":"BNBBTC","U":157,"u":160,"b":[["0.0024","10"]],"a":[["0.0026","100"]]}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn kline_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = KlineParams::builder("bnbusdt".to_string(), KlineIntervalEnum::Interval1s)
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let KlineParams {
                symbol,
                interval,
                id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol", Some(symbol.clone())),
                ("interval", Some(interval.as_str().to_string())),
                ("id", id.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream =
                replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);
            let ws_stream = api
                .kline(params)
                .await
                .expect("kline should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn kline_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();

            let KlineParams {
                symbol,interval,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("interval",
                        Some(interval.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);

            let ws_stream = api.kline(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::KlineResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn kline_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = KlineParams::builder("bnbusdt".to_string(),KlineIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();

            let KlineParams {
                symbol,interval,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("interval",
                        Some(interval.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>", &vars);

            let ws_stream = api.kline(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::KlineResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn kline_offset_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = KlineOffsetParams::builder(
                "bnbusdt".to_string(),
                KlineOffsetIntervalEnum::Interval1s,
            )
            .id(Some(id.clone()))
            .build()
            .unwrap();

            let KlineOffsetParams {
                symbol,
                interval,
                id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol", Some(symbol.clone())),
                ("interval", Some(interval.as_str().to_string())),
                ("id", id.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream =
                replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);
            let ws_stream = api
                .kline_offset(params)
                .await
                .expect("kline_offset should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn kline_offset_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();

            let KlineOffsetParams {
                symbol,interval,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("interval",
                        Some(interval.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);

            let ws_stream = api.kline_offset(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn kline_offset_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = KlineOffsetParams::builder("bnbusdt".to_string(),KlineOffsetIntervalEnum::Interval1s,).id(Some(id.clone())).build().unwrap();

            let KlineOffsetParams {
                symbol,interval,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("interval",
                        Some(interval.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@kline_<interval>@+08:00", &vars);

            let ws_stream = api.kline_offset(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::KlineOffsetResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"kline","E":1672515782136,"s":"BNBBTC","k":{"t":1672515780000,"T":1672515839999,"s":"BNBBTC","i":"1m","f":100,"L":200,"o":"0.0010","c":"0.0020","h":"0.0025","l":"0.0015","v":"1000","n":100,"x":false,"q":"1.0000","V":"500","Q":"0.500","B":"123456"}}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn mini_ticker_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = MiniTickerParams::builder("bnbusdt".to_string())
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let MiniTickerParams { symbol, id } = params.clone();

            let pairs: &[(&str, Option<String>)] =
                &[("symbol", Some(symbol.clone())), ("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);
            let ws_stream = api
                .mini_ticker(params)
                .await
                .expect("mini_ticker should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn mini_ticker_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let MiniTickerParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);

            let ws_stream = api.mini_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn mini_ticker_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = MiniTickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let MiniTickerParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@miniTicker", &vars);

            let ws_stream = api.mini_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::MiniTickerResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"24hrMiniTicker","E":1672515782136,"s":"BNBBTC","c":"0.0025","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18"}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn partial_book_depth_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = PartialBookDepthParams::builder(
                "bnbusdt".to_string(),
                PartialBookDepthLevelsEnum::Levels5,
            )
            .id(Some(id.clone()))
            .build()
            .unwrap();

            let PartialBookDepthParams {
                symbol,
                levels,
                id,
                update_speed,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol", Some(symbol.clone())),
                ("levels", Some(levels.as_str().to_string())),
                ("id", id.clone()),
                ("updateSpeed", update_speed.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders(
                "/<symbol>@depth<levels>@<updateSpeed>",
                &vars,
            );
            let ws_stream = api
                .partial_book_depth(params)
                .await
                .expect("partial_book_depth should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn partial_book_depth_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = PartialBookDepthParams::builder(
                "bnbusdt".to_string(),
                PartialBookDepthLevelsEnum::Levels5,
            )
            .id(Some(id.clone()))
            .build()
            .unwrap();

            let PartialBookDepthParams {
                symbol,
                levels,
                id,
                update_speed,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol", Some(symbol.clone())),
                ("levels", Some(levels.as_str().to_string())),
                ("id", id.clone()),
                ("updateSpeed", update_speed.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders(
                "/<symbol>@depth<levels>@<updateSpeed>",
                &vars,
            );

            let ws_stream = api.partial_book_depth(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(
                r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
            )
            .unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(
                called.load(Ordering::SeqCst),
                "expected our callback to have been invoked"
            );
        });
    }

    #[test]
    fn partial_book_depth_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = PartialBookDepthParams::builder(
                "bnbusdt".to_string(),
                PartialBookDepthLevelsEnum::Levels5,
            )
            .id(Some(id.clone()))
            .build()
            .unwrap();

            let PartialBookDepthParams {
                symbol,
                levels,
                id,
                update_speed,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol", Some(symbol.clone())),
                ("levels", Some(levels.as_str().to_string())),
                ("id", id.clone()),
                ("updateSpeed", update_speed.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders(
                "/<symbol>@depth<levels>@<updateSpeed>",
                &vars,
            );

            let ws_stream = api.partial_book_depth(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::PartialBookDepthResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(
                streams_base.is_subscribed(&stream).await,
                "should be subscribed before unsubscribe"
            );

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(
                r#"{"lastUpdateId":160,"bids":[["0.0024","10"]],"asks":[["0.0026","100"]]}"#,
            )
            .unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(
                !called.load(Ordering::SeqCst),
                "callback should not be invoked after unsubscribe"
            );
        });
    }

    #[test]
    fn rolling_window_ticker_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = RollingWindowTickerParams::builder(
                "bnbusdt".to_string(),
                RollingWindowTickerWindowSizeEnum::WindowSize1h,
            )
            .id(Some(id.clone()))
            .build()
            .unwrap();

            let RollingWindowTickerParams {
                symbol,
                window_size,
                id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol", Some(symbol.clone())),
                ("windowSize", Some(window_size.as_str().to_string())),
                ("id", id.clone()),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream =
                replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);
            let ws_stream = api
                .rolling_window_ticker(params)
                .await
                .expect("rolling_window_ticker should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn rolling_window_ticker_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();

            let RollingWindowTickerParams {
                symbol,window_size,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("windowSize",
                        Some(window_size.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);

            let ws_stream = api.rolling_window_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn rolling_window_ticker_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = RollingWindowTickerParams::builder("bnbusdt".to_string(),RollingWindowTickerWindowSizeEnum::WindowSize1h,).id(Some(id.clone())).build().unwrap();

            let RollingWindowTickerParams {
                symbol,window_size,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("windowSize",
                        Some(window_size.as_str().to_string())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker_<windowSize>", &vars);

            let ws_stream = api.rolling_window_ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::RollingWindowTickerResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"1hTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","o":"0.0010","h":"0.0025","l":"0.0010","c":"0.0025","w":"0.0018","v":"10000","q":"18","O":0,"C":1675216573749,"F":0,"L":18150,"n":18151}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn ticker_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = TickerParams::builder("bnbusdt".to_string())
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let TickerParams { symbol, id } = params.clone();

            let pairs: &[(&str, Option<String>)] =
                &[("symbol", Some(symbol.clone())), ("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);
            let ws_stream = api
                .ticker(params)
                .await
                .expect("ticker should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn ticker_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let TickerParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);

            let ws_stream = api.ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::TickerResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn ticker_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = TickerParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let TickerParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@ticker", &vars);

            let ws_stream = api.ticker(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::TickerResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"24hrTicker","E":1672515782136,"s":"BNBBTC","p":"0.0015","P":"250.00","w":"0.0018","x":"0.0009","c":"0.0025","Q":"10","b":"0.0024","B":"10","a":"0.0026","A":"100","o":"0.0010","h":"0.0025","l":"0.0010","v":"10000","q":"18","O":0,"C":86400000,"F":0,"L":18150,"n":18151}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }

    #[test]
    fn trade_should_execute_successfully() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, _) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = TradeParams::builder("bnbusdt".to_string())
                .id(Some(id.clone()))
                .build()
                .unwrap();

            let TradeParams { symbol, id } = params.clone();

            let pairs: &[(&str, Option<String>)] =
                &[("symbol", Some(symbol.clone())), ("id", id.clone())];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);
            let ws_stream = api
                .trade(params)
                .await
                .expect("trade should return a WebsocketStream");

            assert!(
                streams_base.is_subscribed(&stream).await,
                "expected stream '{stream}' to be subscribed"
            );
            assert_eq!(ws_stream.id.as_deref(), Some("test-id-123"));
        });
    }

    #[test]
    fn trade_should_handle_incoming_message() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let TradeParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);

            let ws_stream = api.trade(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_with_message = called.clone();
            ws_stream.on_message(move |_payload: models::TradeResponse| {
                called_with_message.store(true, Ordering::SeqCst);
            });

            let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;
            yield_now().await;

            assert!(called.load(Ordering::SeqCst), "expected our callback to have been invoked");
        });
    }

    #[test]
    fn trade_should_not_fire_after_unsubscribe() {
        TOKIO_SHARED_RT.block_on(async {
            let (streams_base, conn) = make_streams_base().await;
            let api = WebSocketStreamsApiClient::new(streams_base.clone());

            let id = "test-id-123".to_string();

            let params = TradeParams::builder("bnbusdt".to_string(),).id(Some(id.clone())).build().unwrap();

            let TradeParams {
                symbol,id,
            } = params.clone();

            let pairs: &[(&str, Option<String>)] = &[
                ("symbol",
                        Some(symbol.clone())
                ),
                ("id",
                        id.clone()
                ),
            ];

            let vars: HashMap<_, _> = pairs
                .iter()
                .filter_map(|&(k, ref v)| v.clone().map(|v| (k, v)))
                .collect();
            let stream = replace_websocket_streams_placeholders("/<symbol>@trade", &vars);

            let ws_stream = api.trade(params).await.unwrap();

            let called = Arc::new(AtomicBool::new(false));
            let called_clone = called.clone();
            ws_stream.on_message(move |_payload: models::TradeResponse| {
                called_clone.store(true, Ordering::SeqCst);
            });

            assert!(streams_base.is_subscribed(&stream).await, "should be subscribed before unsubscribe");

            ws_stream.unsubscribe().await;

            let payload: Value = serde_json::from_str(r#"{"e":"trade","E":1672515782136,"s":"BNBBTC","t":12345,"p":"0.001","q":"100","T":1672515782136,"m":true,"M":true}"#).unwrap();
            let msg = json!({
                "stream": stream,
                "data": payload,
            });

            streams_base.on_message(msg.to_string(), conn.clone()).await;

            yield_now().await;

            assert!(!called.load(Ordering::SeqCst), "callback should not be invoked after unsubscribe");
        });
    }
}
